package com.galeno.练习

import ch.hsr.geohash.GeoHash
import com.alibaba.fastjson.JSON
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, PreparedStatement, ResultSet}
import scala.collection.mutable.ListBuffer

/**
 * @Title: ${file_name}
 * @Description: ${todo}
 * @author galeno
 * @date 2021/8/2722:11
 */
object 地理位置集成 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("地理位置集成")
    val sc = new SparkContext(conf)
    val rdd1: RDD[String] = sc.textFile("F://applog/app_log_2021-06-08.log")
    val userAndLocal: RDD[(String, String, String)] = rdd1.map(json => {
      var jSONObject = JSON.parseObject(json)
      var deviceId = jSONObject.getString("deviceId")
      var weidu = jSONObject.getString("latitude")
      var jingdu = jSONObject.getString("longitude")
      (deviceId, weidu, jingdu)
    })

    val userAndGeo = rdd1.map(json => {
      var jSONObject = JSON.parseObject(json)
      var deviceId = jSONObject.getString("deviceId")
      var weidu = jSONObject.getString("latitude")
      var jingdu = jSONObject.getString("longitude")
      val geoCode = GeoHash.geoHashStringWithCharacterPrecision(weidu.toDouble, jingdu.toDouble, 12)
      (deviceId, geoCode)
    })

    userAndGeo.mapPartitions(iter=>{

      val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "root")
      val pstmt = connection.prepareStatement("select  AREANAME,PARENTID, BD09_LNG ,BD09_LAT FROM area ")

      var resultitor= iter.flatMap(tp=>{
        val deviceId = tp._1
        val geoCode = tp._2
        val rs = pstmt.executeQuery()
        val listBuffer = new ListBuffer[(String, String, String, String,String)]
        while (rs.next()){
          val cityName:String = rs.getString("AREANAME")
          val sjingdu: Double = rs.getDouble("BD09_LNG")
          val sweidu = rs.getDouble("BD09_LAT")
          val fuId = rs.getInt("PARENTID")
         val sheng = fuId/10000*10000
          val shi=fuId/100*100




          val geoSql = GeoHash.geoHashStringWithCharacterPrecision(sweidu, sjingdu, 12)
          if (geoSql.startsWith(geoCode.substring(0,10))){
            val st = connection.prepareStatement("select areaname,parentid,id from area where id=?")
            st.setInt(1,sheng)
            val rss = st.executeQuery()
            while (rss.next()){
             
              val city = rss.getString("areaname")
              listBuffer+=((deviceId,cityName,city,fuId.toString,geoSql))
            }

         //   listBuffer+=((deviceId,geoCode,cityName,fuId.toString,geoSql))
          }
        }

          listBuffer.toList

      })
      resultitor
    }).foreach(println)
    Thread.sleep(100000)

  }

}
